iT邦幫忙

2023 iThome 鐵人賽

DAY 21
0

當事件越來越多的時候,每次查詢aggregate時,若從事件溯源(Event Sourcing)第1個開始一個個往後滾,會使查詢變慢,但Query查詢是映射過僅供查詢用的資料,不寫入時使用的model應該是DDD完整概念的aggregate,所以不能去查我們實作的DTO,而解決這效能問題的方式之一就是snapshot(快照)

在事件Repository裡實作 snapshot的相關method:

#[async_trait]
impl PersistedEventRepository for SurrealEventRepository {
    async fn get_snapshot<A: Aggregate>(&self, aggregate_id: &str)
        -> Result<Option<SerializedSnapshot>, PersistenceError> {
        let result  = self.db
            .select(("snapshots", aggregate_id))
            .await?;
        Ok(result)
    }
    
    async fn persist<A: Aggregate>(
        &self,
        events: &[SerializedEvent],
        snapshot_update: Option<(String, Value, usize)>
    ) -> Result<(), PersistenceError> {
        match snapshot_update {
            None => {
                for event in events {
                    let _: Vec<SerializedEvent> = self.db
                        .create("events")
                        .content(event)
                        .await?;
                }
            }
            Some((aggregate_id, aggregate, current_snapshot)) => {
                let mut current_sequence = 0;
                for event in events {
                    current_sequence = event.sequence;
                    let _s: Vec<SerializedEvent> = self.db
                        .create("events")
                        .content(event)
                        .await?;
                }
                let snapshot = SerializedSnapshot{
                    aggregate_id: aggregate_id.clone(),
                    aggregate,
                    current_sequence,
                    current_snapshot,
                };
                if current_snapshot == 1 {
                    let _s: Option<SerializedSnapshot> = self.db
                        .create(("snapshots", aggregate_id))
                        .content(snapshot)
                        .await?;
                } else {
                    let _s: Option<SerializedSnapshot> = self.db
                        .update(("snapshots", aggregate_id))
                        .content(snapshot)
                        .await?;
                }
            }
        }
        Ok(())
    }
}

在儲存事件時,加判斷是否為snapshot,若否(None)則只存事件,若是則存入snapshot,再判斷當前snapshot版本若為1則新增,若為2則更新。

其中snapshot_update的三個參數:

  • aggregate_id: aggregate 的 id
  • aggregate: 該 aggregate 序列化後的snapshot物件(JSON)。
  • current_snapshot: 快照版本。

上一篇
D20 test Reader Query
下一篇
D22 test snapshot
系列文
當rust 遇上 cqrs & es30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言